Iceberg and Spark hash functions are not compatible, just like Hive and Spark hash functions are not compatible. That’s why the new SPJ framework depends on the function catalog.
- Anton > On Apr 18, 2023, at 7:09 PM, Manu Zhang <owenzhang1...@gmail.com> wrote: > > Hi All, > > Since there had been no bucket join in Spark DSv2 until storage-partitioned > join was added in 3.4.0, we've implemented our own in Iceberg. > > We find an issue when joining an Iceberg table with a Spark parquet table as > follows. > > The left side is an Iceberg table with hash distribution and clustered into > two buckets. The right side is a parquet table with no buckets or partitions. > When joining the two tables with SortMergeJoin and > spark.sql.shuffle.partitions=2, there's no exchange in the left side since > Spark believes the distribution is already satisfied while the right side is > exchanged to match the number of shuffle partitions. > This simple test surprisingly fails with no joined results. > I dived into it and found the partition keys used in creating the Iceberg > table and shuffling the right side were different. > That brought me to the hash functions in Iceberg > <https://github.com/apache/iceberg/blob/master/api/src/main/java/org/apache/iceberg/util/BucketUtil.java#L34> > and Spark > <https://github.com/apache/spark/blob/branch-3.1/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/hash.scala#L585>. > They both use Murmur3 hash functions from guava but differ in several places > from a glance. Iceberg uses default seed 0, Spark is 42. Iceberg hashes > integer as long while Spark hashes integer as it is. > Since there's no issue when joining two bucket tables, it's likely the data > incorrectness in this case was caused by the inconsistency of the hash > functions between Iceberg and Spark. > > @Test > public void testBucketJoinOneSide() { > String t1 = tableName + "_t1"; > String t2 = tableName + "_t2"; > > sql("create table %s (id int, price int, seller_id int) " + > "USING iceberg CLUSTERED BY (seller_id) into 2 buckets;", t1); > sql("alter table %s set TBLPROPERTIES > ('write.distribution-mode'='hash');", t1); > sql("create table %s (id int, level int) USING parquet", t2); > > sql("insert into %s values (10, 100, 1), (20, 200, 2)", t1); > sql("insert into %s values (1, 99), (3, 255)", t2); > > sql("set spark.sql.autoBroadcastJoinThreshold=-1"); > sql("set spark.sql.shuffle.partitions=2"); > > ImmutableList<Object[]> expectedRows = ImmutableList.of( > row(1, 99, 10, 100, 1) > ); > assertEquals("Should have expected rows", > expectedRows, > sql("select * from %s as t2 join %s as t1 where t2.id <http://t2.id/> > = t1.seller_id " + > "order by t2.id <http://t2.id/>;", > t2, t1)); > } > > Can you please help double check and confirm? > > Thanks, > Manu