Hi All,

Since there had been no bucket join in Spark DSv2 until storage-partitioned
join was added in 3.4.0, we've implemented our own in Iceberg.

We find an issue when joining an Iceberg table with a Spark parquet table
as follows.


   1. The left side is an Iceberg table with hash distribution and
   clustered into two buckets. The right side is a parquet table with no
   buckets or partitions.
   2. When joining the two tables with SortMergeJoin and
   spark.sql.shuffle.partitions=2, there's no exchange in the left side since
   Spark believes the distribution is already satisfied while the right side
   is exchanged to match the number of shuffle partitions.
   3. This simple test surprisingly fails with no joined results.
   4. I dived into it and found the partition keys used in creating the
   Iceberg table and shuffling the right side were different.
   5. That brought me to the hash functions in Iceberg
   
<https://github.com/apache/iceberg/blob/master/api/src/main/java/org/apache/iceberg/util/BucketUtil.java#L34>
   and Spark
   
<https://github.com/apache/spark/blob/branch-3.1/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/hash.scala#L585>.
   They both use Murmur3 hash functions from guava but differ in several
   places from a glance. Iceberg uses default seed 0, Spark is 42. Iceberg
   hashes integer as long while Spark hashes integer as it is.
   6. Since there's no issue when joining two bucket tables, it's likely
   the data incorrectness in this case was caused by the inconsistency of the
   hash functions between Iceberg and Spark.


  @Test
  public void testBucketJoinOneSide() {
    String t1 = tableName + "_t1";
    String t2 = tableName + "_t2";

    sql("create table %s (id int, price int, seller_id int) " +
        "USING iceberg CLUSTERED BY (seller_id) into 2 buckets;", t1);
    sql("alter table %s set TBLPROPERTIES
('write.distribution-mode'='hash');", t1);
    sql("create table %s (id int, level int) USING parquet", t2);

    sql("insert into %s values (10, 100, 1), (20, 200, 2)", t1);
    sql("insert into %s values (1, 99), (3, 255)", t2);

    sql("set spark.sql.autoBroadcastJoinThreshold=-1");
    sql("set spark.sql.shuffle.partitions=2");

    ImmutableList<Object[]> expectedRows = ImmutableList.of(
        row(1, 99, 10, 100, 1)
    );
    assertEquals("Should have expected rows",
        expectedRows,
        sql("select * from %s as t2 join %s as t1 where t2.id = t1.seller_id " +
                "order by t2.id;",
            t2, t1));
  }


Can you please help double check and confirm?

Thanks,
Manu

Reply via email to