[ https://issues.apache.org/jira/browse/HIVE-27734?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17834642#comment-17834642 ]
Shohei Okumiya edited comment on HIVE-27734 at 4/7/24 1:16 PM: --------------------------------------------------------------- Thanks. I have some quick updates. * What if partition spec is evolved? ** As Denys says, it sounds reasonable to leverage compaction with repartitioning ** As far as I tested SPJ of Spark, it will also likely fall back to shuffle join. * Should we extend Bucket Merge Join for partition transform types other than `bucket[N]`? ** It is highly possible that we need to update the BMJ side ** We may not need to resolve every case at the first step. I don't come up with as many use cases to join tables with timestamp-like columns as bucketed columns. However, it would be nice if we could abstract StorageHandler APIs well I plan to draft a design doc for partition aware optimization. was (Author: okumin): Thanks. I have some quick updates. * What if partition spec is evolved? ** As Denys says, it sounds reasonable to leverage compaction with repartitioning ** As far as I tested SPJ of Spark, it will also likely fall back to shuffle join. * Should we extend Bucket Merge Join for partition transform types other than `bucket[N]`? ** It is highly possible that we need to update the BMJ side ** We may not need to resolve every case at the first step. I don't come up with as many use cases to join tables with timestamp-like columns as bucketed columns. However, it would be nice if we could abstract StorageHandler APIs well > Add Iceberg's storage-partitioned join capabilities to Hive's > [sorted-]bucket-map-join > -------------------------------------------------------------------------------------- > > Key: HIVE-27734 > URL: https://issues.apache.org/jira/browse/HIVE-27734 > Project: Hive > Issue Type: Improvement > Components: Iceberg integration > Affects Versions: 4.0.0-alpha-2 > Reporter: Janos Kovacs > Assignee: Shohei Okumiya > Priority: Major > > Iceberg's 'data bucketing' is implemented through its rich (function based) > partitioning feature which helps to optimize join operations - called storage > partitioned joins. > doc: > [https://docs.google.com/document/d/1foTkDSM91VxKgkEcBMsuAvEjNybjja-uHk-r3vtXWFE/edit#heading=h.82w8qxfl2uwl] > spark impl.: https://issues.apache.org/jira/browse/SPARK-37375 > This feature is not yet leveraged in Hive into its bucket-map-join > optimization, neither alone nor with Iceberg's SortOrder to > sorted-bucket-map-join. > Customers migrating from Hive table format to Iceberg format with storage > optimized schema will experience performance degradation on large tables > where Iceberg's gain on no-listing performance improvement is significantly > smaller than the actual join performance over bucket-join or even > sorted-bucket-join. > > {noformat} > SET hive.query.results.cache.enabled=false; > SET hive.fetch.task.conversion = none; > SET hive.optimize.bucketmapjoin=true; > SET hive.convert.join.bucket.mapjoin.tez=true; > SET hive.auto.convert.join.noconditionaltask.size=1000; > --if you are working with external table, you need this for bmj: > SET hive.disable.unsafe.external.table.operations=false; > -- HIVE BUCKET-MAP-JOIN > DROP TABLE IF EXISTS default.hivebmjt1 PURGE; > DROP TABLE IF EXISTS default.hivebmjt2 PURGE; > CREATE TABLE default.hivebmjt1 (id int, txt string) CLUSTERED BY (id) INTO 8 > BUCKETS; > CREATE TABLE default.hivebmjt2 (id int, txt string); > INSERT INTO default.hivebmjt1 VALUES > (1,'1'),(2,'2'),(3,'3'),(4,'4'),(5,'5'),(6,'6'),(7,'7'),(8,'8'); > INSERT INTO default.hivebmjt2 VALUES (1,'1'),(2,'2'),(3,'3'),(4,'4'); > EXPLAIN > SELECT * FROM default.hivebmjt1 f INNER JOIN default.hivebmjt2 d ON f.id > = d.id; > EXPLAIN > SELECT * FROM default.hivebmjt1 f LEFT OUTER JOIN default.hivebmjt2 d ON f.id > = d.id; > -- Both are optimized into BMJ > -- ICEBERG BUCKET-MAP-JOIN via Iceberg's storage-partitioned join > DROP TABLE IF EXISTS default.icespbmjt1 PURGE; > DROP TABLE IF EXISTS default.icespbmjt2 PURGE; > CREATE TABLE default.icespbmjt1 (txt string) PARTITIONED BY (id int) STORED > BY ICEBERG ; > CREATE TABLE default.icespbmjt2 (txt string) PARTITIONED BY (id int) STORED > BY ICEBERG ; > INSERT INTO default.icespbmjt1 VALUES ('1',1),('2',2),('3',3),('4',4); > INSERT INTO default.icespbmjt2 VALUES ('1',1),('2',2),('3',3),('4',4); > EXPLAIN > SELECT * FROM default.icespbmjt1 f INNER JOIN default.icespbmjt2 d ON > f.id = d.id; > EXPLAIN > SELECT * FROM default.icespbmjt1 f LEFT OUTER JOIN default.icespbmjt2 d ON > f.id = d.id; > -- Only Map-Join optimised > {noformat} -- This message was sent by Atlassian Jira (v8.20.10#820010)