Iceberg and Spark hash functions are not compatible, just like Hive and Spark 
hash functions are not compatible. That’s why the new SPJ framework depends on 
the function catalog.

- Anton

> On Apr 18, 2023, at 7:09 PM, Manu Zhang <owenzhang1...@gmail.com> wrote:
> 
> Hi All,
> 
> Since there had been no bucket join in Spark DSv2 until storage-partitioned 
> join was added in 3.4.0, we've implemented our own in Iceberg. 
> 
> We find an issue when joining an Iceberg table with a Spark parquet table as 
> follows.
> 
> The left side is an Iceberg table with hash distribution and clustered into 
> two buckets. The right side is a parquet table with no buckets or partitions. 
> When joining the two tables with SortMergeJoin and 
> spark.sql.shuffle.partitions=2, there's no exchange in the left side since 
> Spark believes the distribution is already satisfied while the right side is 
> exchanged to match the number of shuffle partitions.
> This simple test surprisingly fails with no joined results.
> I dived into it and found the partition keys used in creating the Iceberg 
> table and shuffling the right side were different.
> That brought me to the hash functions in Iceberg 
> <https://github.com/apache/iceberg/blob/master/api/src/main/java/org/apache/iceberg/util/BucketUtil.java#L34>
>  and Spark 
> <https://github.com/apache/spark/blob/branch-3.1/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/hash.scala#L585>.
>  They both use Murmur3 hash functions from guava but differ in several places 
> from a glance. Iceberg uses default seed 0, Spark is 42. Iceberg hashes 
> integer as long while Spark hashes integer as it is. 
> Since there's no issue when joining two bucket tables, it's likely the data 
> incorrectness in this case was caused by the inconsistency of the hash 
> functions between Iceberg and Spark.
> 
>   @Test
>   public void testBucketJoinOneSide() {
>     String t1 = tableName + "_t1";
>     String t2 = tableName + "_t2";
> 
>     sql("create table %s (id int, price int, seller_id int) " +
>         "USING iceberg CLUSTERED BY (seller_id) into 2 buckets;", t1);
>     sql("alter table %s set TBLPROPERTIES 
> ('write.distribution-mode'='hash');", t1);
>     sql("create table %s (id int, level int) USING parquet", t2);
> 
>     sql("insert into %s values (10, 100, 1), (20, 200, 2)", t1);
>     sql("insert into %s values (1, 99), (3, 255)", t2);
> 
>     sql("set spark.sql.autoBroadcastJoinThreshold=-1");
>     sql("set spark.sql.shuffle.partitions=2");
> 
>     ImmutableList<Object[]> expectedRows = ImmutableList.of(
>         row(1, 99, 10, 100, 1)
>     );
>     assertEquals("Should have expected rows",
>         expectedRows,
>         sql("select * from %s as t2 join %s as t1 where t2.id <http://t2.id/> 
> = t1.seller_id " +
>                 "order by t2.id <http://t2.id/>;",
>             t2, t1));
>   }
> 
> Can you please help double check and confirm?
> 
> Thanks,
> Manu

Reply via email to