[ https://issues.apache.org/jira/browse/HIVE-20825?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Tom Zeng updated HIVE-20825: ---------------------------- Description: When using Hive ACID Merge (supported with the ORC format) to update/insert data, bucket files with 0 byte or 3 bytes (file content is three character: ORC) are generated during MERGE INTO operations which finish with no errors. Subsequent queries on the base table will get "Not a valid ORC file" error. The following script can be used to reproduce the issue(note that with small amount of data like this increase the number of bucket could result in query to work, but with large data set it will fail no matter what bucket size): set hive.auto.convert.join=false; set hive.enforce.bucketing=true; set hive.exec.dynamic.partition.mode = nonstrict; set hive.support.concurrency=true; set hive.txn.manager = org.apache.hadoop.hive.ql.lockmgr.DbTxnManager; drop table if exists mergedelta_txt_1; drop table if exists mergedelta_txt_2; CREATE TABLE mergedelta_txt_1 ( id_str varchar(12), time_key int, value bigint) PARTITIONED BY (date_key int) ROW FORMAT DELIMITED STORED AS TEXTFILE; CREATE TABLE mergedelta_txt_2 ( id_str varchar(12), time_key int, value bigint) PARTITIONED BY (date_key int) ROW FORMAT DELIMITED STORED AS TEXTFILE; INSERT INTO TABLE mergedelta_txt_1 partition(date_key=20170103) VALUES ("AB94LIENR0",46700,12345676836978), ("AB94LIENR1",46825,12345676836978), ("AB94LIENS0",46709,12345676836978), ("AB94LIENS1",46834,12345676836978), ("AB94LIENT0",46709,12345676836978), ("AB94LIENT1",46834,12345676836978), ("AB94LIENU0",46718,12345676836978), ("AB94LIENU1",46844,12345676836978), ("AB94LIENV0",46719,12345676836978), ("AB94LIENV1",46844,12345676836978), ("AB94LIENW0",46728,12345676836978), ("AB94LIENW1",46854,12345676836978), ("AB94LIENX0",46728,12345676836978), ("AB94LIENX1",46854,12345676836978), ("AB94LIENY0",46737,12345676836978), ("AB94LIENY1",46863,12345676836978), ("AB94LIENZ0",46738,12345676836978), ("AB94LIENZ1",46863,12345676836978), ("AB94LIERA0",47176,12345676836982), ("AB94LIERA1",47302,12345676836982); INSERT INTO TABLE mergedelta_txt_2 partition(date_key=20170103) VALUES ("AB94LIENT1",46834,12345676836978), ("AB94LIENU0",46718,12345676836978), ("AB94LIENU1",46844,12345676836978), ("AB94LIENV0",46719,12345676836978), ("AB94LIENV1",46844,12345676836978), ("AB94LIENW0",46728,12345676836978), ("AB94LIENW1",46854,12345676836978), ("AB94LIENX0",46728,12345676836978), ("AB94LIENX1",46854,12345676836978), ("AB94LIENY0",46737,12345676836978), ("AB94LIENY1",46863,12345676836978), ("AB94LIENZ0",46738,12345676836978), ("AB94LIENZ1",46863,12345676836978), ("AB94LIERA0",47176,12345676836982), ("AB94LIERA1",47302,12345676836982), ("AB94LIERA2",47418,12345676836982), ("AB94LIERB0",47176,12345676836982), ("AB94LIERB1",47302,12345676836982), ("AB94LIERB2",47418,12345676836982), ("AB94LIERC0",47185,12345676836982); DROP TABLE IF EXISTS mergebase_1; CREATE TABLE mergebase_1 ( id_str varchar(12) , time_key int , value bigint) PARTITIONED BY (date_key int) CLUSTERED BY (id_str,time_key) INTO 4 BUCKETS STORED AS ORC TBLPROPERTIES ( 'orc.compress'='SNAPPY', 'pk_columns'='id_str,date_key,time_key', 'NO_AUTO_COMPACTION'='true', 'transactional'='true'); MERGE INTO mergebase_1 AS base USING (SELECT * FROM ( SELECT id_str ,time_key ,value, date_key, rank() OVER (PARTITION BY id_str,date_key,time_key ORDER BY id_str,date_key,time_key) AS rk FROM mergedelta_txt_1 DISTRIBUTE BY date_key ) rankedtbl WHERE rankedtbl.rk=1 ) AS delta ON delta.id_str=base.id_str AND delta.date_key=base.date_key AND delta.time_key=base.time_key WHEN MATCHED THEN UPDATE SET value=delta.value WHEN NOT MATCHED THEN INSERT VALUES ( delta.id_str , delta.time_key , delta.value, delta.date_key); MERGE INTO mergebase_1 AS base USING (SELECT * FROM ( SELECT id_str ,time_key ,value, date_key, rank() OVER (PARTITION BY id_str,date_key,time_key ORDER BY id_str,date_key,time_key) AS rk FROM mergedelta_txt_2 DISTRIBUTE BY date_key ) rankedtbl WHERE rankedtbl.rk=1 ) AS delta ON delta.id_str=base.id_str AND delta.date_key=base.date_key AND delta.time_key=base.time_key WHEN MATCHED THEN UPDATE SET value=delta.value WHEN NOT MATCHED THEN INSERT VALUES ( delta.id_str , delta.time_key , delta.value, delta.date_key); select count(*) from mergebase_1; was: When using Hive ACID Merge (supported with the ORC format) to update/insert data, bucket files with 0 byte or 3 bytes (file content is three character: ORC) are generated during MERGE INTO operations which finish with no errors. Subsequent queries on the base table will get "Not a valid ORC file" error. The following script can be used to reproduce the issue: set hive.auto.convert.join=false; set hive.enforce.bucketing=true; set hive.exec.dynamic.partition.mode = nonstrict; set hive.support.concurrency=true; set hive.txn.manager = org.apache.hadoop.hive.ql.lockmgr.DbTxnManager; drop table if exists mergedelta_txt_1; drop table if exists mergedelta_txt_2; CREATE TABLE mergedelta_txt_1 ( id_str varchar(12), time_key int, value bigint) PARTITIONED BY (date_key int) ROW FORMAT DELIMITED STORED AS TEXTFILE; CREATE TABLE mergedelta_txt_2 ( id_str varchar(12), time_key int, value bigint) PARTITIONED BY (date_key int) ROW FORMAT DELIMITED STORED AS TEXTFILE; INSERT INTO TABLE mergedelta_txt_1 partition(date_key=20170103) VALUES ("AB94LIENR0",46700,12345676836978), ("AB94LIENR1",46825,12345676836978), ("AB94LIENS0",46709,12345676836978), ("AB94LIENS1",46834,12345676836978), ("AB94LIENT0",46709,12345676836978), ("AB94LIENT1",46834,12345676836978), ("AB94LIENU0",46718,12345676836978), ("AB94LIENU1",46844,12345676836978), ("AB94LIENV0",46719,12345676836978), ("AB94LIENV1",46844,12345676836978), ("AB94LIENW0",46728,12345676836978), ("AB94LIENW1",46854,12345676836978), ("AB94LIENX0",46728,12345676836978), ("AB94LIENX1",46854,12345676836978), ("AB94LIENY0",46737,12345676836978), ("AB94LIENY1",46863,12345676836978), ("AB94LIENZ0",46738,12345676836978), ("AB94LIENZ1",46863,12345676836978), ("AB94LIERA0",47176,12345676836982), ("AB94LIERA1",47302,12345676836982); INSERT INTO TABLE mergedelta_txt_2 partition(date_key=20170103) VALUES ("AB94LIENT1",46834,12345676836978), ("AB94LIENU0",46718,12345676836978), ("AB94LIENU1",46844,12345676836978), ("AB94LIENV0",46719,12345676836978), ("AB94LIENV1",46844,12345676836978), ("AB94LIENW0",46728,12345676836978), ("AB94LIENW1",46854,12345676836978), ("AB94LIENX0",46728,12345676836978), ("AB94LIENX1",46854,12345676836978), ("AB94LIENY0",46737,12345676836978), ("AB94LIENY1",46863,12345676836978), ("AB94LIENZ0",46738,12345676836978), ("AB94LIENZ1",46863,12345676836978), ("AB94LIERA0",47176,12345676836982), ("AB94LIERA1",47302,12345676836982), ("AB94LIERA2",47418,12345676836982), ("AB94LIERB0",47176,12345676836982), ("AB94LIERB1",47302,12345676836982), ("AB94LIERB2",47418,12345676836982), ("AB94LIERC0",47185,12345676836982); DROP TABLE IF EXISTS mergebase_1; CREATE TABLE mergebase_1 ( id_str varchar(12) , time_key int , value bigint) PARTITIONED BY (date_key int) CLUSTERED BY (id_str,time_key) INTO 32 BUCKETS STORED AS ORC TBLPROPERTIES ( 'orc.compress'='SNAPPY', 'pk_columns'='id_str,date_key,time_key', 'NO_AUTO_COMPACTION'='true', 'transactional'='true'); MERGE INTO mergebase_1 AS base USING (SELECT * FROM ( SELECT id_str ,time_key ,value, date_key, rank() OVER (PARTITION BY id_str,date_key,time_key ORDER BY id_str,date_key,time_key) AS rk FROM mergedelta_txt_1 DISTRIBUTE BY date_key ) rankedtbl WHERE rankedtbl.rk=1 ) AS delta ON delta.id_str=base.id_str AND delta.date_key=base.date_key AND delta.time_key=base.time_key WHEN MATCHED THEN UPDATE SET value=delta.value WHEN NOT MATCHED THEN INSERT VALUES ( delta.id_str , delta.time_key , delta.value, delta.date_key); MERGE INTO mergebase_1 AS base USING (SELECT * FROM ( SELECT id_str ,time_key ,value, date_key, rank() OVER (PARTITION BY id_str,date_key,time_key ORDER BY id_str,date_key,time_key) AS rk FROM mergedelta_txt_2 DISTRIBUTE BY date_key ) rankedtbl WHERE rankedtbl.rk=1 ) AS delta ON delta.id_str=base.id_str AND delta.date_key=base.date_key AND delta.time_key=base.time_key WHEN MATCHED THEN UPDATE SET value=delta.value WHEN NOT MATCHED THEN INSERT VALUES ( delta.id_str , delta.time_key , delta.value, delta.date_key); select count(*) from mergebase_1; > Hive ACID Merge generates invalid ORC files (bucket files 0 or 3 bytes in > length) causing the "Not a valid ORC file" error > -------------------------------------------------------------------------------------------------------------------------- > > Key: HIVE-20825 > URL: https://issues.apache.org/jira/browse/HIVE-20825 > Project: Hive > Issue Type: Bug > Components: Hive, ORC, Transactions > Affects Versions: 2.2.0, 2.3.1, 2.3.2 > Environment: Hive 2.3.x on Amazon EMR 5.8.0 to 5.18.0 > Reporter: Tom Zeng > Assignee: Eugene Koifman > Priority: Major > Attachments: hive-merge-invalid-orc-repro.hql, > hive-merge-invalid-orc-repro.log > > > When using Hive ACID Merge (supported with the ORC format) to update/insert > data, bucket files with 0 byte or 3 bytes (file content is three character: > ORC) are generated during MERGE INTO operations which finish with no errors. > Subsequent queries on the base table will get "Not a valid ORC file" error. > > The following script can be used to reproduce the issue(note that with small > amount of data like this increase the number of bucket could result in query > to work, but with large data set it will fail no matter what bucket size): > set hive.auto.convert.join=false; > set hive.enforce.bucketing=true; > set hive.exec.dynamic.partition.mode = nonstrict; > set hive.support.concurrency=true; > set hive.txn.manager = org.apache.hadoop.hive.ql.lockmgr.DbTxnManager; > drop table if exists mergedelta_txt_1; > drop table if exists mergedelta_txt_2; > CREATE TABLE mergedelta_txt_1 ( > id_str varchar(12), time_key int, value bigint) > PARTITIONED BY (date_key int) > ROW FORMAT DELIMITED > STORED AS TEXTFILE; > CREATE TABLE mergedelta_txt_2 ( > id_str varchar(12), time_key int, value bigint) > PARTITIONED BY (date_key int) > ROW FORMAT DELIMITED > STORED AS TEXTFILE; > INSERT INTO TABLE mergedelta_txt_1 > partition(date_key=20170103) > VALUES > ("AB94LIENR0",46700,12345676836978), > ("AB94LIENR1",46825,12345676836978), > ("AB94LIENS0",46709,12345676836978), > ("AB94LIENS1",46834,12345676836978), > ("AB94LIENT0",46709,12345676836978), > ("AB94LIENT1",46834,12345676836978), > ("AB94LIENU0",46718,12345676836978), > ("AB94LIENU1",46844,12345676836978), > ("AB94LIENV0",46719,12345676836978), > ("AB94LIENV1",46844,12345676836978), > ("AB94LIENW0",46728,12345676836978), > ("AB94LIENW1",46854,12345676836978), > ("AB94LIENX0",46728,12345676836978), > ("AB94LIENX1",46854,12345676836978), > ("AB94LIENY0",46737,12345676836978), > ("AB94LIENY1",46863,12345676836978), > ("AB94LIENZ0",46738,12345676836978), > ("AB94LIENZ1",46863,12345676836978), > ("AB94LIERA0",47176,12345676836982), > ("AB94LIERA1",47302,12345676836982); > INSERT INTO TABLE mergedelta_txt_2 > partition(date_key=20170103) > VALUES > ("AB94LIENT1",46834,12345676836978), > ("AB94LIENU0",46718,12345676836978), > ("AB94LIENU1",46844,12345676836978), > ("AB94LIENV0",46719,12345676836978), > ("AB94LIENV1",46844,12345676836978), > ("AB94LIENW0",46728,12345676836978), > ("AB94LIENW1",46854,12345676836978), > ("AB94LIENX0",46728,12345676836978), > ("AB94LIENX1",46854,12345676836978), > ("AB94LIENY0",46737,12345676836978), > ("AB94LIENY1",46863,12345676836978), > ("AB94LIENZ0",46738,12345676836978), > ("AB94LIENZ1",46863,12345676836978), > ("AB94LIERA0",47176,12345676836982), > ("AB94LIERA1",47302,12345676836982), > ("AB94LIERA2",47418,12345676836982), > ("AB94LIERB0",47176,12345676836982), > ("AB94LIERB1",47302,12345676836982), > ("AB94LIERB2",47418,12345676836982), > ("AB94LIERC0",47185,12345676836982); > DROP TABLE IF EXISTS mergebase_1; > CREATE TABLE mergebase_1 ( > id_str varchar(12) , time_key int , value bigint) > PARTITIONED BY (date_key int) > CLUSTERED BY (id_str,time_key) INTO 4 BUCKETS > STORED AS ORC > TBLPROPERTIES ( > 'orc.compress'='SNAPPY', > 'pk_columns'='id_str,date_key,time_key', > 'NO_AUTO_COMPACTION'='true', > 'transactional'='true'); > MERGE INTO mergebase_1 AS base > USING (SELECT * > FROM ( > SELECT id_str ,time_key ,value, date_key, rank() OVER (PARTITION BY > id_str,date_key,time_key ORDER BY id_str,date_key,time_key) AS rk > FROM mergedelta_txt_1 > DISTRIBUTE BY date_key > ) rankedtbl > WHERE rankedtbl.rk=1 > ) AS delta > ON delta.id_str=base.id_str AND delta.date_key=base.date_key AND > delta.time_key=base.time_key > WHEN MATCHED THEN UPDATE SET value=delta.value > WHEN NOT MATCHED THEN INSERT VALUES ( delta.id_str , delta.time_key , > delta.value, delta.date_key); > MERGE INTO mergebase_1 AS base > USING (SELECT * > FROM ( > SELECT id_str ,time_key ,value, date_key, rank() OVER (PARTITION BY > id_str,date_key,time_key ORDER BY id_str,date_key,time_key) AS rk > FROM mergedelta_txt_2 > DISTRIBUTE BY date_key > ) rankedtbl > WHERE rankedtbl.rk=1 > ) AS delta > ON delta.id_str=base.id_str AND delta.date_key=base.date_key AND > delta.time_key=base.time_key > WHEN MATCHED THEN UPDATE SET value=delta.value > WHEN NOT MATCHED THEN INSERT VALUES ( delta.id_str , delta.time_key , > delta.value, delta.date_key); > select count(*) from mergebase_1; -- This message was sent by Atlassian JIRA (v7.6.3#76005)