Hi All, Since there had been no bucket join in Spark DSv2 until storage-partitioned join was added in 3.4.0, we've implemented our own in Iceberg.
We find an issue when joining an Iceberg table with a Spark parquet table as follows. 1. The left side is an Iceberg table with hash distribution and clustered into two buckets. The right side is a parquet table with no buckets or partitions. 2. When joining the two tables with SortMergeJoin and spark.sql.shuffle.partitions=2, there's no exchange in the left side since Spark believes the distribution is already satisfied while the right side is exchanged to match the number of shuffle partitions. 3. This simple test surprisingly fails with no joined results. 4. I dived into it and found the partition keys used in creating the Iceberg table and shuffling the right side were different. 5. That brought me to the hash functions in Iceberg <https://github.com/apache/iceberg/blob/master/api/src/main/java/org/apache/iceberg/util/BucketUtil.java#L34> and Spark <https://github.com/apache/spark/blob/branch-3.1/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/hash.scala#L585>. They both use Murmur3 hash functions from guava but differ in several places from a glance. Iceberg uses default seed 0, Spark is 42. Iceberg hashes integer as long while Spark hashes integer as it is. 6. Since there's no issue when joining two bucket tables, it's likely the data incorrectness in this case was caused by the inconsistency of the hash functions between Iceberg and Spark. @Test public void testBucketJoinOneSide() { String t1 = tableName + "_t1"; String t2 = tableName + "_t2"; sql("create table %s (id int, price int, seller_id int) " + "USING iceberg CLUSTERED BY (seller_id) into 2 buckets;", t1); sql("alter table %s set TBLPROPERTIES ('write.distribution-mode'='hash');", t1); sql("create table %s (id int, level int) USING parquet", t2); sql("insert into %s values (10, 100, 1), (20, 200, 2)", t1); sql("insert into %s values (1, 99), (3, 255)", t2); sql("set spark.sql.autoBroadcastJoinThreshold=-1"); sql("set spark.sql.shuffle.partitions=2"); ImmutableList<Object[]> expectedRows = ImmutableList.of( row(1, 99, 10, 100, 1) ); assertEquals("Should have expected rows", expectedRows, sql("select * from %s as t2 join %s as t1 where t2.id = t1.seller_id " + "order by t2.id;", t2, t1)); } Can you please help double check and confirm? Thanks, Manu