[ 
https://issues.apache.org/jira/browse/HIVE-20825?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tom Zeng updated HIVE-20825:
----------------------------
    Description: 
When using Hive ACID Merge (supported with the ORC format) to update/insert 
data, bucket files with 0 byte or 3 bytes (file content is three character: 
ORC) are generated during MERGE INTO operations which finish with no errors. 
Subsequent queries on the base table will get "Not a valid ORC file" error.

 

The following script can be used to reproduce the issue(note that with small 
amount of data like this increase the number of bucket could result in query to 
work, but with large data set it will fail no matter what bucket size):

set hive.auto.convert.join=false;
 set hive.enforce.bucketing=true;
 set hive.exec.dynamic.partition.mode = nonstrict;
 set hive.support.concurrency=true;
 set hive.txn.manager = org.apache.hadoop.hive.ql.lockmgr.DbTxnManager;

drop table if exists mergedelta_txt_1;
 drop table if exists mergedelta_txt_2;

CREATE TABLE mergedelta_txt_1 (
 id_str varchar(12), time_key int, value bigint)
 PARTITIONED BY (date_key int)
 ROW FORMAT DELIMITED
 STORED AS TEXTFILE;

CREATE TABLE mergedelta_txt_2 (
 id_str varchar(12), time_key int, value bigint)
 PARTITIONED BY (date_key int)
 ROW FORMAT DELIMITED
 STORED AS TEXTFILE;

INSERT INTO TABLE mergedelta_txt_1
 partition(date_key=20170103)
 VALUES
 ("AB94LIENR0",46700,12345676836978),
 ("AB94LIENR1",46825,12345676836978),
 ("AB94LIENS0",46709,12345676836978),
 ("AB94LIENS1",46834,12345676836978),
 ("AB94LIENT0",46709,12345676836978),
 ("AB94LIENT1",46834,12345676836978),
 ("AB94LIENU0",46718,12345676836978),
 ("AB94LIENU1",46844,12345676836978),
 ("AB94LIENV0",46719,12345676836978),
 ("AB94LIENV1",46844,12345676836978),
 ("AB94LIENW0",46728,12345676836978),
 ("AB94LIENW1",46854,12345676836978),
 ("AB94LIENX0",46728,12345676836978),
 ("AB94LIENX1",46854,12345676836978),
 ("AB94LIENY0",46737,12345676836978),
 ("AB94LIENY1",46863,12345676836978),
 ("AB94LIENZ0",46738,12345676836978),
 ("AB94LIENZ1",46863,12345676836978),
 ("AB94LIERA0",47176,12345676836982),
 ("AB94LIERA1",47302,12345676836982);

INSERT INTO TABLE mergedelta_txt_2
 partition(date_key=20170103)
 VALUES 
 ("AB94LIENT1",46834,12345676836978),
 ("AB94LIENU0",46718,12345676836978),
 ("AB94LIENU1",46844,12345676836978),
 ("AB94LIENV0",46719,12345676836978),
 ("AB94LIENV1",46844,12345676836978),
 ("AB94LIENW0",46728,12345676836978),
 ("AB94LIENW1",46854,12345676836978),
 ("AB94LIENX0",46728,12345676836978),
 ("AB94LIENX1",46854,12345676836978),
 ("AB94LIENY0",46737,12345676836978),
 ("AB94LIENY1",46863,12345676836978),
 ("AB94LIENZ0",46738,12345676836978),
 ("AB94LIENZ1",46863,12345676836978),
 ("AB94LIERA0",47176,12345676836982),
 ("AB94LIERA1",47302,12345676836982),
 ("AB94LIERA2",47418,12345676836982),
 ("AB94LIERB0",47176,12345676836982),
 ("AB94LIERB1",47302,12345676836982),
 ("AB94LIERB2",47418,12345676836982),
 ("AB94LIERC0",47185,12345676836982);

DROP TABLE IF EXISTS mergebase_1;
 CREATE TABLE mergebase_1 (
 id_str varchar(12) , time_key int , value bigint)
 PARTITIONED BY (date_key int)
 CLUSTERED BY (id_str,time_key) INTO 4 BUCKETS
 STORED AS ORC
 TBLPROPERTIES (
 'orc.compress'='SNAPPY',
 'pk_columns'='id_str,date_key,time_key',
 'NO_AUTO_COMPACTION'='true',
 'transactional'='true');

MERGE INTO mergebase_1 AS base
 USING (SELECT * 
 FROM (
 SELECT id_str ,time_key ,value, date_key, rank() OVER (PARTITION BY 
id_str,date_key,time_key ORDER BY id_str,date_key,time_key) AS rk 
 FROM mergedelta_txt_1
 DISTRIBUTE BY date_key
 ) rankedtbl 
 WHERE rankedtbl.rk=1
 ) AS delta
 ON delta.id_str=base.id_str AND delta.date_key=base.date_key AND 
delta.time_key=base.time_key
 WHEN MATCHED THEN UPDATE SET value=delta.value
 WHEN NOT MATCHED THEN INSERT VALUES ( delta.id_str , delta.time_key , 
delta.value, delta.date_key);

MERGE INTO mergebase_1 AS base
 USING (SELECT * 
 FROM (
 SELECT id_str ,time_key ,value, date_key, rank() OVER (PARTITION BY 
id_str,date_key,time_key ORDER BY id_str,date_key,time_key) AS rk 
 FROM mergedelta_txt_2
 DISTRIBUTE BY date_key
 ) rankedtbl 
 WHERE rankedtbl.rk=1
 ) AS delta
 ON delta.id_str=base.id_str AND delta.date_key=base.date_key AND 
delta.time_key=base.time_key
 WHEN MATCHED THEN UPDATE SET value=delta.value
 WHEN NOT MATCHED THEN INSERT VALUES ( delta.id_str , delta.time_key , 
delta.value, delta.date_key);

select count(*) from mergebase_1;

  was:
When using Hive ACID Merge (supported with the ORC format) to update/insert 
data, bucket files with 0 byte or 3 bytes (file content is three character: 
ORC) are generated during MERGE INTO operations which finish with no errors. 
Subsequent queries on the base table will get "Not a valid ORC file" error.

 

The following script can be used to reproduce the issue:

set hive.auto.convert.join=false;
set hive.enforce.bucketing=true;
set hive.exec.dynamic.partition.mode = nonstrict;
set hive.support.concurrency=true;
set hive.txn.manager = org.apache.hadoop.hive.ql.lockmgr.DbTxnManager;

drop table if exists mergedelta_txt_1;
drop table if exists mergedelta_txt_2;

CREATE TABLE mergedelta_txt_1 (
id_str varchar(12), time_key int, value bigint)
PARTITIONED BY (date_key int)
ROW FORMAT DELIMITED
STORED AS TEXTFILE;

CREATE TABLE mergedelta_txt_2 (
id_str varchar(12), time_key int, value bigint)
PARTITIONED BY (date_key int)
ROW FORMAT DELIMITED
STORED AS TEXTFILE;

INSERT INTO TABLE mergedelta_txt_1
partition(date_key=20170103)
VALUES
 ("AB94LIENR0",46700,12345676836978),
 ("AB94LIENR1",46825,12345676836978),
 ("AB94LIENS0",46709,12345676836978),
 ("AB94LIENS1",46834,12345676836978),
 ("AB94LIENT0",46709,12345676836978),
 ("AB94LIENT1",46834,12345676836978),
 ("AB94LIENU0",46718,12345676836978),
 ("AB94LIENU1",46844,12345676836978),
 ("AB94LIENV0",46719,12345676836978),
 ("AB94LIENV1",46844,12345676836978),
 ("AB94LIENW0",46728,12345676836978),
 ("AB94LIENW1",46854,12345676836978),
 ("AB94LIENX0",46728,12345676836978),
 ("AB94LIENX1",46854,12345676836978),
 ("AB94LIENY0",46737,12345676836978),
 ("AB94LIENY1",46863,12345676836978),
 ("AB94LIENZ0",46738,12345676836978),
 ("AB94LIENZ1",46863,12345676836978),
 ("AB94LIERA0",47176,12345676836982),
 ("AB94LIERA1",47302,12345676836982);

INSERT INTO TABLE mergedelta_txt_2
partition(date_key=20170103)
VALUES 
 ("AB94LIENT1",46834,12345676836978),
 ("AB94LIENU0",46718,12345676836978),
 ("AB94LIENU1",46844,12345676836978),
 ("AB94LIENV0",46719,12345676836978),
 ("AB94LIENV1",46844,12345676836978),
 ("AB94LIENW0",46728,12345676836978),
 ("AB94LIENW1",46854,12345676836978),
 ("AB94LIENX0",46728,12345676836978),
 ("AB94LIENX1",46854,12345676836978),
 ("AB94LIENY0",46737,12345676836978),
 ("AB94LIENY1",46863,12345676836978),
 ("AB94LIENZ0",46738,12345676836978),
 ("AB94LIENZ1",46863,12345676836978),
 ("AB94LIERA0",47176,12345676836982),
 ("AB94LIERA1",47302,12345676836982),
 ("AB94LIERA2",47418,12345676836982),
 ("AB94LIERB0",47176,12345676836982),
 ("AB94LIERB1",47302,12345676836982),
 ("AB94LIERB2",47418,12345676836982),
 ("AB94LIERC0",47185,12345676836982);

DROP TABLE IF EXISTS mergebase_1;
CREATE TABLE mergebase_1 (
id_str varchar(12) , time_key int , value bigint)
PARTITIONED BY (date_key int)
CLUSTERED BY (id_str,time_key) INTO 32 BUCKETS
STORED AS ORC
TBLPROPERTIES (
 'orc.compress'='SNAPPY',
 'pk_columns'='id_str,date_key,time_key',
 'NO_AUTO_COMPACTION'='true',
 'transactional'='true');

MERGE INTO mergebase_1 AS base
USING (SELECT * 
 FROM (
 SELECT id_str ,time_key ,value, date_key, rank() OVER (PARTITION BY 
id_str,date_key,time_key ORDER BY id_str,date_key,time_key) AS rk 
 FROM mergedelta_txt_1
 DISTRIBUTE BY date_key
 ) rankedtbl 
 WHERE rankedtbl.rk=1
) AS delta
ON delta.id_str=base.id_str AND delta.date_key=base.date_key AND 
delta.time_key=base.time_key
WHEN MATCHED THEN UPDATE SET value=delta.value
WHEN NOT MATCHED THEN INSERT VALUES ( delta.id_str , delta.time_key , 
delta.value, delta.date_key);

MERGE INTO mergebase_1 AS base
USING (SELECT * 
 FROM (
 SELECT id_str ,time_key ,value, date_key, rank() OVER (PARTITION BY 
id_str,date_key,time_key ORDER BY id_str,date_key,time_key) AS rk 
 FROM mergedelta_txt_2
 DISTRIBUTE BY date_key
 ) rankedtbl 
 WHERE rankedtbl.rk=1
) AS delta
ON delta.id_str=base.id_str AND delta.date_key=base.date_key AND 
delta.time_key=base.time_key
WHEN MATCHED THEN UPDATE SET value=delta.value
WHEN NOT MATCHED THEN INSERT VALUES ( delta.id_str , delta.time_key , 
delta.value, delta.date_key);

select count(*) from mergebase_1;


> Hive ACID Merge generates invalid ORC files (bucket files 0 or 3 bytes in 
> length) causing the "Not a valid ORC file" error
> --------------------------------------------------------------------------------------------------------------------------
>
>                 Key: HIVE-20825
>                 URL: https://issues.apache.org/jira/browse/HIVE-20825
>             Project: Hive
>          Issue Type: Bug
>          Components: Hive, ORC, Transactions
>    Affects Versions: 2.2.0, 2.3.1, 2.3.2
>         Environment: Hive 2.3.x on Amazon EMR 5.8.0 to 5.18.0
>            Reporter: Tom Zeng
>            Assignee: Eugene Koifman
>            Priority: Major
>         Attachments: hive-merge-invalid-orc-repro.hql, 
> hive-merge-invalid-orc-repro.log
>
>
> When using Hive ACID Merge (supported with the ORC format) to update/insert 
> data, bucket files with 0 byte or 3 bytes (file content is three character: 
> ORC) are generated during MERGE INTO operations which finish with no errors. 
> Subsequent queries on the base table will get "Not a valid ORC file" error.
>  
> The following script can be used to reproduce the issue(note that with small 
> amount of data like this increase the number of bucket could result in query 
> to work, but with large data set it will fail no matter what bucket size):
> set hive.auto.convert.join=false;
>  set hive.enforce.bucketing=true;
>  set hive.exec.dynamic.partition.mode = nonstrict;
>  set hive.support.concurrency=true;
>  set hive.txn.manager = org.apache.hadoop.hive.ql.lockmgr.DbTxnManager;
> drop table if exists mergedelta_txt_1;
>  drop table if exists mergedelta_txt_2;
> CREATE TABLE mergedelta_txt_1 (
>  id_str varchar(12), time_key int, value bigint)
>  PARTITIONED BY (date_key int)
>  ROW FORMAT DELIMITED
>  STORED AS TEXTFILE;
> CREATE TABLE mergedelta_txt_2 (
>  id_str varchar(12), time_key int, value bigint)
>  PARTITIONED BY (date_key int)
>  ROW FORMAT DELIMITED
>  STORED AS TEXTFILE;
> INSERT INTO TABLE mergedelta_txt_1
>  partition(date_key=20170103)
>  VALUES
>  ("AB94LIENR0",46700,12345676836978),
>  ("AB94LIENR1",46825,12345676836978),
>  ("AB94LIENS0",46709,12345676836978),
>  ("AB94LIENS1",46834,12345676836978),
>  ("AB94LIENT0",46709,12345676836978),
>  ("AB94LIENT1",46834,12345676836978),
>  ("AB94LIENU0",46718,12345676836978),
>  ("AB94LIENU1",46844,12345676836978),
>  ("AB94LIENV0",46719,12345676836978),
>  ("AB94LIENV1",46844,12345676836978),
>  ("AB94LIENW0",46728,12345676836978),
>  ("AB94LIENW1",46854,12345676836978),
>  ("AB94LIENX0",46728,12345676836978),
>  ("AB94LIENX1",46854,12345676836978),
>  ("AB94LIENY0",46737,12345676836978),
>  ("AB94LIENY1",46863,12345676836978),
>  ("AB94LIENZ0",46738,12345676836978),
>  ("AB94LIENZ1",46863,12345676836978),
>  ("AB94LIERA0",47176,12345676836982),
>  ("AB94LIERA1",47302,12345676836982);
> INSERT INTO TABLE mergedelta_txt_2
>  partition(date_key=20170103)
>  VALUES 
>  ("AB94LIENT1",46834,12345676836978),
>  ("AB94LIENU0",46718,12345676836978),
>  ("AB94LIENU1",46844,12345676836978),
>  ("AB94LIENV0",46719,12345676836978),
>  ("AB94LIENV1",46844,12345676836978),
>  ("AB94LIENW0",46728,12345676836978),
>  ("AB94LIENW1",46854,12345676836978),
>  ("AB94LIENX0",46728,12345676836978),
>  ("AB94LIENX1",46854,12345676836978),
>  ("AB94LIENY0",46737,12345676836978),
>  ("AB94LIENY1",46863,12345676836978),
>  ("AB94LIENZ0",46738,12345676836978),
>  ("AB94LIENZ1",46863,12345676836978),
>  ("AB94LIERA0",47176,12345676836982),
>  ("AB94LIERA1",47302,12345676836982),
>  ("AB94LIERA2",47418,12345676836982),
>  ("AB94LIERB0",47176,12345676836982),
>  ("AB94LIERB1",47302,12345676836982),
>  ("AB94LIERB2",47418,12345676836982),
>  ("AB94LIERC0",47185,12345676836982);
> DROP TABLE IF EXISTS mergebase_1;
>  CREATE TABLE mergebase_1 (
>  id_str varchar(12) , time_key int , value bigint)
>  PARTITIONED BY (date_key int)
>  CLUSTERED BY (id_str,time_key) INTO 4 BUCKETS
>  STORED AS ORC
>  TBLPROPERTIES (
>  'orc.compress'='SNAPPY',
>  'pk_columns'='id_str,date_key,time_key',
>  'NO_AUTO_COMPACTION'='true',
>  'transactional'='true');
> MERGE INTO mergebase_1 AS base
>  USING (SELECT * 
>  FROM (
>  SELECT id_str ,time_key ,value, date_key, rank() OVER (PARTITION BY 
> id_str,date_key,time_key ORDER BY id_str,date_key,time_key) AS rk 
>  FROM mergedelta_txt_1
>  DISTRIBUTE BY date_key
>  ) rankedtbl 
>  WHERE rankedtbl.rk=1
>  ) AS delta
>  ON delta.id_str=base.id_str AND delta.date_key=base.date_key AND 
> delta.time_key=base.time_key
>  WHEN MATCHED THEN UPDATE SET value=delta.value
>  WHEN NOT MATCHED THEN INSERT VALUES ( delta.id_str , delta.time_key , 
> delta.value, delta.date_key);
> MERGE INTO mergebase_1 AS base
>  USING (SELECT * 
>  FROM (
>  SELECT id_str ,time_key ,value, date_key, rank() OVER (PARTITION BY 
> id_str,date_key,time_key ORDER BY id_str,date_key,time_key) AS rk 
>  FROM mergedelta_txt_2
>  DISTRIBUTE BY date_key
>  ) rankedtbl 
>  WHERE rankedtbl.rk=1
>  ) AS delta
>  ON delta.id_str=base.id_str AND delta.date_key=base.date_key AND 
> delta.time_key=base.time_key
>  WHEN MATCHED THEN UPDATE SET value=delta.value
>  WHEN NOT MATCHED THEN INSERT VALUES ( delta.id_str , delta.time_key , 
> delta.value, delta.date_key);
> select count(*) from mergebase_1;



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to